/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package com.icee.myth.common;

import com.icee.myth.common.channelContext.HeartbeatChannelContext;
import com.icee.myth.common.messageQueue.ServerMessageQueue;
import com.icee.myth.common.message.serverMessage.Message;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

/**
 *
 * @author liuxianke
 */
public abstract class AbstractHeartbeatServer implements Server {

    private String host;    // Server的ip地址
    private int port; // Server的端口地址
    private ServerBootstrap bootstrap;
    private int totalClientNum;
    private Channel bindChannel = null;
    
    protected HeartbeatChannelContext[] channelContexts; // 记录与Client(服务器端相互间的client)连接的channel context，下标是Client的ID，Client的ID从0开始

    public AbstractHeartbeatServer() {
    }

    public void init(String host, int totalClientNum, int port, ChannelPipelineFactory pipelineFactory) {
        this.host = host;
        this.totalClientNum = totalClientNum;
        channelContexts = new HeartbeatChannelContext[totalClientNum];
        for (int i = 0; i < totalClientNum; i++) {
            channelContexts[i] = new HeartbeatChannelContext();
        }

        this.port = port;

        // Start server with Nb of active threads = 2*NB CPU + 1 as maximum.
        ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
                Runtime.getRuntime().availableProcessors() * 2 + 1);

        // Configure the server.
        bootstrap = new ServerBootstrap(factory);

        bootstrap.setPipelineFactory(pipelineFactory);

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        bootstrap.setOption("reuseAddress ", true);
    }

    @Override
    public void startServer() {
        // Bind and startOnlyOnce to accept incoming connections.
        bindChannel = bootstrap.bind(new InetSocketAddress(host, port));
    }

    @Override
    public void closeServer() {
        bindChannel.close();

        for (int i = 0; i < totalClientNum; i++) {
            channelContexts[i].close();
        }
    }
    
    protected void addClient(int clientId, Channel channel) {
        assert (clientId >= 0 && clientId < totalClientNum);
        // TODO: 如果gwId不在范围内应该抛异常

        channelContexts[clientId].setChannel(channel);
        restoreHeartbeat(clientId);
    }

    protected void heartbeat() {
        for (int i = 0; i < totalClientNum; i++) {
            // 向对方发送心跳消息
            channelContexts[i].write(buildServerHeartBeat());

            // 心跳计数减一
            int beatNum = channelContexts[i].heartbeat();
            // 心跳计数小于等于0表示发生故障
            if (beatNum <= 0) {
                // 当心跳计数为0时，向消息队列中产生一服务故障消息。
                if (beatNum == 0) {
                    ServerMessageQueue.queue().offer(buildClientDownMessage(i));

                    channelContexts[i].close();
                }
                // TODO: 报警，当心跳计数小于等于0后产生连续的报警
                clientDownWarning(i);
            }
        }
    }

    protected void restoreHeartbeat(int clientId) {
        assert (clientId < totalClientNum);
        channelContexts[clientId].restoreHeartbeat();
    }

    protected void clientClose(Channel channel) {
        for (int i = 0; i < totalClientNum; i++) {
            if (channelContexts[i].isChannel(channel)) {
                channelContexts[i].close();
                clientCloseWarning(i);
                return;
            }
        }
    }

    public HeartbeatChannelContext getChannelContext(int id) {
        return channelContexts[id];
    }

    public void broadcast(Object msg) {
        for(int i=0; i<totalClientNum; i++) {
            channelContexts[i].write(msg);
        }
    }

    public void flush() {
        for(int i=0; i<totalClientNum; i++) {
            channelContexts[i].flush();
        }
    }

    protected abstract Object buildServerHeartBeat();

    protected abstract Message buildClientDownMessage(int clientId);

    protected abstract void clientDownWarning(int clientId);

    protected abstract void clientCloseWarning(int clientId);
}
